package fun.easycode.datastream;

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

/**
 * 消息队列选择器
 *  选择消息队列的时候，根据outputTableName在集合的位置进行选择
 *  主要是为了保证消息的顺序性, 还有就是提高性能, 否则一个队列也可以
 * @author xuzhen97
 */
public class MessageQueueSelectorImpl implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // org 必须传入outputTableName
        int mqSize = mqs.size();
        String outputTableName = (String) arg;
        int indexOf = DataContext.indexOf(outputTableName);
        return mqs.get(indexOf % mqSize);
    }
}